package com.cantor.consumer.start;

import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.cantor.common.exception.IncorrectHostOrPortException;
import com.cantor.common.util.CantorUtil;
import com.cantor.consumer.future.FuturesKeeper;
import com.cantor.consumer.handler.CantorResponseMessageHandler;
import com.cantor.consumer.handler.PongHandler;
import com.cantor.consumer.handler.ReconnectHandler;
import com.cantor.consumer.netty.impl.ExponentialBackOffRetry;
import com.cantor.core.handler.CantorMessageCodec;
import com.cantor.core.handler.StickAndHalfPackageDecoder;
import com.cantor.core.message.CantorRequestMessage;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * 负责管理所有与服务端的连接
 */
@Slf4j
public class ConsumerNettyKeeper {

    // Bootstrap
    private static final Bootstrap bootstrap;

    // EventLoopGroup
    private static final EventLoopGroup group;

    // 维护一个Map, 存放所有与provider的连接的channel
    private static final Map<String, Channel> connectionsMap;

    // 本机IP提前准备好
    private static final String localHost = CantorUtil.getExactLocalHost();

    // 初始化
    static {
        connectionsMap = new ConcurrentHashMap<>();
        group = new NioEventLoopGroup();
        final CantorResponseMessageHandler IN_RESPONSE_HANDLER = new CantorResponseMessageHandler(); // 业务处理
        final PongHandler IN_PONG_HANDLER = new PongHandler(); // pong心跳包处理(负责接收pong, 和写空闲事件处理)
        bootstrap = new Bootstrap().group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
                .option(ChannelOption.TCP_NODELAY, true) // 直接发包
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // Duplex
                        // pipeline.addLast(new LoggingHandler()); // 日志
                        pipeline.addLast(new IdleStateHandler(10L, 0, 0, TimeUnit.SECONDS)); // 读空闲处理器
                        pipeline.addLast(new CantorMessageCodec()); // 编解码器
                        // Out

                        // In
                        pipeline.addLast(new StickAndHalfPackageDecoder()); // 半包黏包处理器
                        pipeline.addLast(IN_PONG_HANDLER); // 心跳包处理
                        pipeline.addLast(IN_RESPONSE_HANDLER); // 业务处理器
                    }
                });
    }

    // 得到一个channel
    public static Channel getChannel(String host, Integer port) {
        // 检查host和port是否正确
        if (StrUtil.isEmpty(host) || null == port) {
            throw new IncorrectHostOrPortException();
        }
        // 处理host
        host = computeHost(host);
        String key = host + ":" + port;
        return connectionsMap.get(key);
    }

    // 增加一个对服务端的连接
    @SneakyThrows
    public static boolean addConnection(String host, Integer port) {
        // 1. 检查host和port是否正确
        if (StrUtil.isEmpty(host) || null == port) {
            throw new IncorrectHostOrPortException();
        }
        // 处理host
        host = computeHost(host);
        String key = host + ":" + port;

        // 2. 建立连接并加入map, 返回连接是否建立成功
        // 如果连接存在且可用, 直接返回true
        Channel channelGot = connectionsMap.get(key);
        if (ObjectUtil.isNotNull(channelGot) && channelGot.isActive()) {
            log.debug("Channel {} ------ {} 已经存在与Map中,且活跃,当前维护的连接数{}", channelGot.localAddress(), channelGot.remoteAddress(), connectionsMap.size());
            return true;
        }
        // 如果连接存在但不可用, 关闭连接, 移出map, 重新建立连接, 重新加入map
        if (ObjectUtil.isNotNull(channelGot) && !channelGot.isActive()) {
            log.debug("Channel {} ------ {} 已经存在与Map中,但无效,已移出并将重新加入map, 当前维护的连接数{}", channelGot.localAddress(), channelGot.remoteAddress(), connectionsMap.size());
            connectionsMap.remove(key).close();
        }
        // 3. 新Channel开始建立
        String tempHost = host; // 不知为何,必须要用新变量接收下面的listener中才能使用.
//        Channel channel = bootstrap.connect(host, port)
//                // Listener1: 给这个Channel添加重连器
//                .addListener(((ChannelFutureListener) f->{
//                    f.channel().pipeline().remove("IN_RECONNECT_HANDLER"); // 移除再重新添加重试处理器是因为要传入host和port
//                    f.channel().pipeline().addLast("IN_RECONNECT_HANDLER",new ReConnectHandler(tempHost,port,new ExponentialBackOffRetry(5, Integer.MAX_VALUE, 60 * 1000)));
//                }))
//                // Listener2: 根据连接情况做一些操作
//                .addListener((ChannelFutureListener) f -> {
//                    if (!f.isSuccess()) {
//                        log.error("连接失败,触发fireChannelInactive来重连");
//                        f.channel().pipeline().fireChannelInactive();
//                    }
//                })
//                .sync()
//                .channel();
        // 异步连接
        ChannelFuture channelFuture = bootstrap.connect(host, port);
        // 添加重连处理器(价值在于其里面的channelInactive方法)
        channelFuture.channel().pipeline().addLast("IN_RECONNECT_HANDLER", new ReconnectHandler(tempHost, port, new ExponentialBackOffRetry(5000, Integer.MAX_VALUE, 60 * 1000)));
        // 等待
        Channel channel = channelFuture
                .addListener((ChannelFutureListener) f -> {
                    // 如果连接没成功,触发channelInactive
                    if (!f.isSuccess()) {
                        log.error("连接失败,触发fireChannelInactive来重连");
                        f.channel().pipeline().fireChannelInactive();
                    }
                })
                .sync()
                .channel();
        // 如果连接失败, 触发pipeline.fireChannelInActive(),并返回false
        if (!channel.isActive()) {
            log.error("连接至 {} 失败!", key);
            return false;
        }
        // 4. 存入ConnectionsMap
        connectionsMap.put(key, channel);

        // 5. 未来关闭
        channel.closeFuture().addListener(f -> log.info("[连接关闭] {} !!!!!! {}", channel.localAddress(), channel.remoteAddress()));

        // 6. 输出已经连接的channel情况
        showChannelInfo();

        // 7. 返回是否成功连接.
        return true;
    }

    // 对host进行处理, 防止是本机无法访问的情况
    private static String computeHost(String host) {
        return localHost.equals(host) ? "127.0.0.1" : host;
    }

    // 断开一个连接
    public static void closeConnection(String host, Integer port) {
    }

    // 发送请求数据, 发送后通知FuturesKeeper添加一个CompletableFuture
    public static CompletableFuture sendCantorRequestMessage(String host, Integer port, CantorRequestMessage message) {
        // 记录
        // recordLoadCount(host, port, message.getParameterValue());

        // CompletableFuture方案 (推荐,这种方式是juc原生)
        getChannel(host, port)
                .writeAndFlush(message)
                .addListener((ChannelFutureListener) cf -> log.debug("{} >>> Request >>> {}", cf.channel().localAddress(), cf.channel().remoteAddress()));
        return FuturesKeeper.record(message.getSequenceId(), new CompletableFuture());

        // Promise方案
//        Channel channel = getChannel(host, port);
//        channel.writeAndFlush(message);
//        return PromisesKeeper.record(message.getSequenceId(), new DefaultPromise(channel.eventLoop()));
    }

    // 输出Channel连接情况
    public static void showChannelInfo() {
        // 输出channel的连接情况.
        log.info("当前维护的channel数量: {}", connectionsMap.size());
        connectionsMap.values().forEach(ch -> {
            log.info("{} ======&====== {}", ch.localAddress(), ch.remoteAddress());
        });
    }


    // 记录向每个节点发送的数量(检验负载均衡)
    private static final Map<String, Integer> loadCount = new ConcurrentHashMap<>();

    private static void recordLoadCount(String host, Integer port, Object data) {
        host = computeHost(host);
        String key = host + ":" + port;
        if (loadCount.containsKey(key)) {
            loadCount.put(key, Convert.toInt(loadCount.get(key)) + 1);
        } else {
            loadCount.put(key, 1);
        }
    }

    // 输出负载情况
    public static void showLoadCount() {
        if (loadCount == null || loadCount.size() == 0) {
            System.err.println("没有记录负载情况(此功能已取消)");
            return;
        }
        System.err.println("负载情况如下:");
        loadCount.forEach((address, count) -> {
            System.err.println(StrUtil.format("{} 被调用 {} 次", address, count));
        });
    }

}
